Amazon EventBridge Pipesを使ってプロデューサー/コンシューマー型メッセージ処理のパイプラインを簡略化しよう #reinvent
re:Invent 2022で発表されたAmazon EventBridge PipesはProducer/Consumer型のメッセージング処理をメッセージブローカーやワーカーに関係なく一貫した手法で実装するサービスです。
メッセージブローカーにAmazon SQS、ConsumerのワーカーにLambdaを利用する構成で、従来のLambdaイベントソース型とEventBridge Pipes型を比較しましょう。
Producer/Consumer型メッセージ処理
Producer/Consumer型メッセージは、メッセージキューなどで利用される、非同期のメッセージングパターンです。Producerはブローカーにメッセージを送信し、Consumerはブローカーをポーリングしてメッセージを受信します。
SQS-Lambda構成の場合、SQSがメッセージブローカー、LambdaがConsumerに相当します。
以下では Producerのことは一旦忘れてブローカーとConsumer間の処理にフォーカスし、2006年から本稼働しているSQSのConsumer実装が、AWSの進化とともに疎結合になる姿を確認します。
1. ナイーブ実装
SQSキューのメッセージをSQSのAPIを使って処理すると、以下の流れで行います。
- SQSキュー内をポーリング
- メッセージを受信
- メッセージを処理
- メッセージをキューから削除
import boto3 sqs = boto3.client('sqs') SQS_QUEUE_URL = 'https://sqs.eu-central-1.amazonaws.com/123456789012/bar' response = sqs.receive_message( QueueUrl = SQS_QUEUE_URL, ) for message in response.get("Messages", []): # メッセージの受信 print(message["Body"]) # 処理 sqs.delete_message( QueueUrl = SQS_QUEUE_URL, ReceiptHandle = message['ReceiptHandle'] ) # メッセージの削除
コンシューマーはメッセージ処理(ビジネスロジック)に注力したいのに
- ポーリング
- メッセージの受信
- メッセージの削除
なども必要です。
これらの手間を省くのが、次のLambdaイベントソース型です。
2. Lambdaイベントソース型
Lambdaのイベントソース(トリガー)にSQSを指定します。
イベントソース・コンポーネントが
- ポーリング
- メッセージの受信
- メッセージの削除
を行うため、Lambda関数は、メッセージを処理するだけですみます。
def lambda_handler(event, context): for message in event['Records']: print(message['body'])
CloudFormationテンプレート (ここをクリックしてください)
Resources: QueueForLambda: Type: 'AWS::SQS::Queue' LambdaServiceRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Statement: - Action: 'sts:AssumeRole' Effect: Allow Principal: Service: lambda.amazonaws.com Version: '2012-10-17' ManagedPolicyArns: - 'Fn::Join': - '' - - 'arn:' - Ref: 'AWS::Partition' - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' LambdaServiceRoleDefaultPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyDocument: Statement: - Action: - 'sqs:ChangeMessageVisibility' - 'sqs:DeleteMessage' - 'sqs:GetQueueAttributes' - 'sqs:GetQueueUrl' - 'sqs:ReceiveMessage' Effect: Allow Resource: 'Fn::GetAtt': - QueueForLambda - Arn Version: '2012-10-17' PolicyName: LambdaServiceRoleDefaultPolicy Roles: - Ref: LambdaServiceRole LambdaForSQS: Type: 'AWS::Lambda::Function' Properties: Code: ZipFile: |- def handler(event, context): for record in event['Records']: print(record['body']) Handler: index.handler Role: 'Fn::GetAtt': - LambdaServiceRole - Arn Runtime: python3.11 DependsOn: - LambdaServiceRoleDefaultPolicy - LambdaServiceRole LambdaSqsEventSourceSqs: Type: 'AWS::Lambda::EventSourceMapping' Properties: EventSourceArn: 'Fn::GetAtt': - QueueForLambda - Arn FunctionName: Ref: LambdaForSQS
Lambdaの実行ロールには、SQS操作用の
sqs:ReceiveMessage
sqs:DeleteMessage
sqs:GetQueueAttributes
といったアクションも許可します。
ブローカーとコンシューマーは明確に分離されていますが、コンシューマーのすべての処理はLambdaに寄せられています。
コンシューマーからメッセージ処理以外を引き剥がしたのが、次のEventBridge Pipes型です。
3. EventBridge Pipes型
EventBridge Pipesがブローカーとターゲットを仲介し、コンシューマーの処理に必要な
- Source(ブローカーとのポーリング)
- Filter(フィルタリング)
- Enrichment(プリプロセス)
- Target(ターゲットをメッセージとともに呼び出す)
のパイプライン処理を行います。
SourceにはSQS、TargetにはLambdaを指定します。
def lambda_handler(event, context): for message in event: print(message['body'])
CloudFormationテンプレート (ここをクリックしてください)
--- Resources: QueueForPipe: Type: AWS::SQS::Queue LambdaServiceRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: lambda.amazonaws.com Version: '2012-10-17' ManagedPolicyArns: - Fn::Join: - '' - - 'arn:' - Ref: AWS::Partition - ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" LambdaForPipe: Type: AWS::Lambda::Function Properties: Code: ZipFile: |- def handler(event, context): for record in event: print(record['body']) Handler: index.handler Role: Fn::GetAtt: - LambdaServiceRole - Arn Runtime: python3.11 DependsOn: - LambdaServiceRole PipeRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: pipes.amazonaws.com Version: '2012-10-17' PipeRoleDefaultPolicy: Type: AWS::IAM::Policy Properties: PolicyDocument: Statement: - Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage Effect: Allow Resource: Fn::GetAtt: - QueueForPipe - Arn - Action: lambda:InvokeFunction Effect: Allow Resource: - Fn::GetAtt: - LambdaForPipe - Arn - Fn::Join: - '' - - Fn::GetAtt: - LambdaForPipe - Arn - ":*" Version: '2012-10-17' PolicyName: PipeRoleDefaultPolicy Roles: - Ref: PipeRole Pipe: Type: AWS::Pipes::Pipe Properties: RoleArn: Fn::GetAtt: - PipeRole - Arn Source: Fn::GetAtt: - QueueForPipe - Arn Target: Fn::GetAtt: - LambdaForPipe - Arn TargetParameters: LambdaFunctionParameters: InvocationType: REQUEST_RESPONSE
ブローカーとポーリングし、取得したメッセージでLambdaを呼び出すのはPipesのため、Pipesの実行ロールには、
- ソース操作用
sqs:ReceiveMessage
sqs:DeleteMessage
sqs:GetQueueAttributes
- ターゲット操作用
lambda:InvokeFunction
といったアクションを許可します。
Lambda関数の責務は受け取ったメッセージを処理するだけのため、ブローカーを意識する必要はなく、ブローカーを操作するポリシーも不要です。
SQSとPipesとLambdaが疎結合になりました。
最後に
SQS-Lambda構成に限定すると、LambdaのイベントソースマッピングがEventBridge Pipesに置き換わっただけです。
もう一段上のProducer/Consumer型メッセージングサービスを利用している観点から考えると、ブローカー(SQSなど)やワーカー(Lambdaなど)に関係なく
- Source(ブローカーとポーリング)
- Filter(フィルタリング)
- Enrichment(プリプロセス)
- Target(メッセージとともにTargetを呼び出す)
というパイプラインをEventBridge Pipesで抽象化できます。
※ 図は公式ドキュメントから引用
さらに、ブローカーを操作するポリシーはTargetではなくEventBridge Pipesに付与することで、各コンポーネントの責任範囲が明確になりました。
これがEventBridge Pipesの狙いです。
運用しているブローカー・コンシューマーが複雑になるにつれ、EventBridge Pipesを利用し、責任範囲を明確にして、一貫したパイプラインでメッセージを処理するメリットが増えると思います。
AWS Japan SA によるServerlessDays Tokyo 2022 での発表資料も合わせて参照ください。
それでは。